agentmux_srv\backend\storage/
agents_consolidate.rs

1// Copyright 2025-2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Phase 3a — one-shot backfill from `db_agent_definitions` +
5//! `db_agent_instances` into the new consolidated `db_agents` table.
6//!
7//! See `docs/specs/SPEC_AGENT_CONCEPT_CONSOLIDATION_2026_05_24.md`.
8//!
9//! Phase 3a is **write-only**: this migration populates `db_agents` so
10//! a later Phase 3b PR can flip readers over with full confidence the
11//! data is there. Phase 3c drops the old tables.
12//!
13//! Marker-file gated (`<data_dir>/migration_agents_consolidate_v1.flag`)
14//! so the backfill only runs once per data dir. Idempotent on second
15//! run (marker check short-circuits).
16//!
17//! Algorithm (per spec §"What `db_agent_instances` rows become"):
18//!
19//! 1. For each `db_agent_definitions WHERE is_seeded = 1`: INSERT a
20//!    template projection (`is_template = 1`, bindings empty).
21//!
22//! 2. For each `db_agent_definitions WHERE is_seeded = 0`: INSERT a
23//!    user-clone projection (`is_template = 0`, `parent_template_id =
24//!    parent_id`).
25//!
26//! 3. For each `db_agent_instances` row whose `definition_id` points at
27//!    a TEMPLATE: INSERT a new user-clone projection keyed by
28//!    `instance.id`, `parent_template_id = definition_id`, name +
29//!    bindings from the instance.
30//!
31//! 4. For each `db_agent_instances` row whose `definition_id` points at
32//!    an already-user-cloned definition: UPDATE the existing
33//!    user-clone projection (keyed by the definition id from pass 2) to
34//!    fold in the instance's bindings. If multiple instances point at
35//!    the same user-clone, the most-recent (`created_at` DESC) wins
36//!    and a warning is logged.
37//!
38//! Continuation rows (`parent_instance_id` non-empty) are skipped — the
39//! consolidated model has no place for them; they were the
40//! pre-Option-E continuation chain.
41
42use std::path::Path;
43
44use rusqlite::{params, Connection};
45use tracing::{info, warn};
46
47use super::error::StoreError;
48
49/// Marker filename. Lives in the data dir (one level above the `db/`
50/// subdir that holds `objects.db`).
51pub const CONSOLIDATE_MARKER: &str = "migration_agents_consolidate_v1.flag";
52
53/// Backfill statistics — useful for logs + tests.
54#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
55pub struct ConsolidateStats {
56    pub templates_inserted: usize,
57    pub user_defs_inserted: usize,
58    pub instances_as_clone_inserted: usize,
59    pub instances_folded_into_def: usize,
60    pub instances_skipped_continuation: usize,
61    pub instances_skipped_no_definition: usize,
62    pub instances_collision_warned: usize,
63    pub already_done: bool,
64}
65
66/// Run the one-shot consolidation backfill, gated by the marker file.
67///
68/// `data_dir` is the directory that holds the marker file (typically
69/// the parent of the `db/` directory). Pass `None` to skip marker
70/// gating — only intended for tests + in-memory stores.
71///
72/// Returns `Ok(stats)` on success (incl. the marker-already-present
73/// short-circuit). Failures roll back the active transaction and
74/// return the underlying SQLite error; the marker is NOT written.
75pub fn run_consolidate_migration(
76    conn: &mut Connection,
77    data_dir: Option<&Path>,
78) -> Result<ConsolidateStats, StoreError> {
79    // Marker gate.
80    if let Some(dir) = data_dir {
81        let marker = dir.join(CONSOLIDATE_MARKER);
82        if marker.exists() {
83            return Ok(ConsolidateStats {
84                already_done: true,
85                ..Default::default()
86            });
87        }
88    }
89
90    // The backfill runs inside a transaction so a mid-flight failure
91    // leaves db_agents empty rather than half-populated. The dual-
92    // write call sites tolerate empty + idempotently upsert on the
93    // next mutation, so partial backfill state is recoverable.
94    let tx = conn.transaction()?;
95
96    let mut stats = ConsolidateStats::default();
97
98    // Pass 1 + 2 — definitions.
99    {
100        let mut def_stmt = tx.prepare(
101            "SELECT id, name, icon, provider, description,
102                    working_directory, shell, provider_flags, auto_start,
103                    restart_on_crash, idle_timeout_minutes, created_at,
104                    agent_type, environment, agent_bus_id, is_seeded,
105                    accounts, parent_id, branch_label, updated_at, slug
106             FROM db_agent_definitions",
107        )?;
108        let rows = def_stmt.query_map([], |row| {
109            Ok(DefRow {
110                id: row.get(0)?,
111                name: row.get(1)?,
112                icon: row.get(2)?,
113                provider: row.get(3)?,
114                description: row.get(4)?,
115                working_directory: row.get(5)?,
116                shell: row.get(6)?,
117                provider_flags: row.get(7)?,
118                auto_start: row.get(8)?,
119                restart_on_crash: row.get(9)?,
120                idle_timeout_minutes: row.get(10)?,
121                created_at: row.get(11)?,
122                agent_type: row.get(12)?,
123                environment: row.get(13)?,
124                agent_bus_id: row.get(14)?,
125                is_seeded: row.get(15)?,
126                accounts: row.get(16)?,
127                parent_id: row.get(17)?,
128                branch_label: row.get(18)?,
129                updated_at: row.get(19)?,
130                slug: row.get(20)?,
131            })
132        })?;
133        let mut defs: Vec<DefRow> = Vec::new();
134        for r in rows {
135            defs.push(r?);
136        }
137        drop(def_stmt);
138        for def in &defs {
139            let is_template = if def.is_seeded == 1 { 1_i64 } else { 0_i64 };
140            let parent_template_id = if def.is_seeded == 1 {
141                String::new()
142            } else {
143                def.parent_id.clone()
144            };
145            // Use INSERT OR REPLACE so a re-run after a partial state
146            // (e.g. a developer deleted the marker manually) doesn't
147            // explode on the PK; the user-clone-binding-fold path
148            // immediately below relies on definition rows existing.
149            tx.execute(
150                "INSERT OR REPLACE INTO db_agents (
151                    id, name, icon, description,
152                    is_template, parent_template_id,
153                    provider, provider_flags, shell, environment,
154                    agent_type, agent_bus_id, accounts,
155                    auto_start, restart_on_crash, idle_timeout_minutes,
156                    slug, branch_label,
157                    identity_id, memory_id, working_directory, github_context,
158                    instance_name,
159                    created_at, updated_at, is_seeded, user_hidden
160                 ) VALUES (
161                    ?1, ?2, ?3, ?4,
162                    ?5, ?6,
163                    ?7, ?8, ?9, ?10,
164                    ?11, ?12, ?13,
165                    ?14, ?15, ?16,
166                    ?17, ?18,
167                    '', '', '', '',
168                    '',
169                    ?19, ?20, ?21, 0
170                 )",
171                params![
172                    def.id,
173                    def.name,
174                    def.icon,
175                    def.description,
176                    is_template,
177                    parent_template_id,
178                    def.provider,
179                    def.provider_flags,
180                    def.shell,
181                    def.environment,
182                    def.agent_type,
183                    def.agent_bus_id,
184                    def.accounts,
185                    def.auto_start,
186                    def.restart_on_crash,
187                    def.idle_timeout_minutes,
188                    def.slug,
189                    def.branch_label,
190                    def.created_at,
191                    def.updated_at,
192                    def.is_seeded,
193                ],
194            )?;
195            if def.is_seeded == 1 {
196                stats.templates_inserted += 1;
197            } else {
198                stats.user_defs_inserted += 1;
199            }
200        }
201    }
202
203    // Pass 3 + 4 — instances.
204    // Order by created_at DESC so the FIRST instance we see for a
205    // given user-cloned def is the most recent (the spec wants
206    // most-recent bindings to win on collision).
207    let inst_rows: Vec<InstanceRow> = {
208        let mut stmt = tx.prepare(
209            "SELECT i.id, i.definition_id, i.parent_instance_id,
210                    i.instance_name, i.identity_id, i.memory_id,
211                    i.working_directory, i.github_context,
212                    i.created_at, i.display_hidden,
213                    d.is_seeded, d.name, d.icon, d.description,
214                    d.provider, d.provider_flags, d.shell, d.environment,
215                    d.agent_type, d.agent_bus_id, d.accounts,
216                    d.auto_start, d.restart_on_crash, d.idle_timeout_minutes,
217                    d.slug, d.branch_label
218             FROM db_agent_instances i
219             LEFT JOIN db_agent_definitions d ON d.id = i.definition_id
220             ORDER BY i.created_at DESC",
221        )?;
222        let iter = stmt.query_map([], |row| {
223            Ok(InstanceRow {
224                id: row.get(0)?,
225                definition_id: row.get(1)?,
226                parent_instance_id: row.get(2)?,
227                instance_name: row.get(3)?,
228                identity_id: row.get(4)?,
229                memory_id: row.get(5)?,
230                working_directory: row.get(6)?,
231                github_context: row.get(7)?,
232                created_at: row.get(8)?,
233                display_hidden: row.get::<_, i64>(9)? != 0,
234                def_is_seeded: row.get::<_, Option<i64>>(10)?.unwrap_or(0),
235                def_name: row.get::<_, Option<String>>(11)?.unwrap_or_default(),
236                def_icon: row.get::<_, Option<String>>(12)?.unwrap_or_default(),
237                def_description: row.get::<_, Option<String>>(13)?.unwrap_or_default(),
238                def_provider: row.get::<_, Option<String>>(14)?.unwrap_or_default(),
239                def_provider_flags: row.get::<_, Option<String>>(15)?.unwrap_or_default(),
240                def_shell: row.get::<_, Option<String>>(16)?.unwrap_or_default(),
241                def_environment: row.get::<_, Option<String>>(17)?.unwrap_or_default(),
242                def_agent_type: row
243                    .get::<_, Option<String>>(18)?
244                    .unwrap_or_else(|| "standalone".to_string()),
245                def_agent_bus_id: row.get::<_, Option<String>>(19)?.unwrap_or_default(),
246                def_accounts: row.get::<_, Option<String>>(20)?.unwrap_or_default(),
247                def_auto_start: row.get::<_, Option<i64>>(21)?.unwrap_or(0),
248                def_restart_on_crash: row.get::<_, Option<i64>>(22)?.unwrap_or(0),
249                def_idle_timeout_minutes: row.get::<_, Option<i64>>(23)?.unwrap_or(0),
250                def_slug: row.get::<_, Option<String>>(24)?.unwrap_or_default(),
251                def_branch_label: row.get::<_, Option<String>>(25)?.unwrap_or_default(),
252                def_present: row.get::<_, Option<i64>>(10)?.is_some(),
253            })
254        })?;
255        let mut out = Vec::new();
256        for r in iter {
257            out.push(r?);
258        }
259        out
260    };
261
262    // Track which user-clone def-ids already had their bindings folded.
263    // Multiple instances on the same user-clone-def → first wins (most
264    // recent because we ordered DESC); the rest get a warning.
265    let mut folded: std::collections::HashSet<String> = std::collections::HashSet::new();
266
267    for inst in &inst_rows {
268        if !inst.parent_instance_id.is_empty() {
269            stats.instances_skipped_continuation += 1;
270            continue;
271        }
272        if !inst.def_present {
273            // Orphaned instance — no definition row. The old schema's
274            // FK cascade would have removed this; if it survived,
275            // there's nothing to project against.
276            stats.instances_skipped_no_definition += 1;
277            warn!(
278                instance_id = %inst.id,
279                definition_id = %inst.definition_id,
280                "agents_consolidate: instance has no definition; skipping",
281            );
282            continue;
283        }
284        if inst.def_is_seeded == 1 {
285            // Instance of a TEMPLATE — INSERT a new user-clone row
286            // keyed by the instance id.
287            let name = if inst.instance_name.is_empty() {
288                inst.def_name.clone()
289            } else {
290                inst.instance_name.clone()
291            };
292            tx.execute(
293                "INSERT OR REPLACE INTO db_agents (
294                    id, name, icon, description,
295                    is_template, parent_template_id,
296                    provider, provider_flags, shell, environment,
297                    agent_type, agent_bus_id, accounts,
298                    auto_start, restart_on_crash, idle_timeout_minutes,
299                    slug, branch_label,
300                    identity_id, memory_id, working_directory, github_context,
301                    instance_name,
302                    created_at, updated_at, is_seeded, user_hidden
303                 ) VALUES (
304                    ?1, ?2, ?3, ?4,
305                    0, ?5,
306                    ?6, ?7, ?8, ?9,
307                    ?10, ?11, ?12,
308                    ?13, ?14, ?15,
309                    ?16, ?17,
310                    ?18, ?19, ?20, ?21,
311                    ?22,
312                    ?23, ?23, 0, ?24
313                 )",
314                params![
315                    inst.id,
316                    name,
317                    inst.def_icon,
318                    inst.def_description,
319                    inst.definition_id,
320                    inst.def_provider,
321                    inst.def_provider_flags,
322                    inst.def_shell,
323                    inst.def_environment,
324                    inst.def_agent_type,
325                    inst.def_agent_bus_id,
326                    inst.def_accounts,
327                    inst.def_auto_start,
328                    inst.def_restart_on_crash,
329                    inst.def_idle_timeout_minutes,
330                    inst.def_slug,
331                    inst.def_branch_label,
332                    inst.identity_id,
333                    inst.memory_id,
334                    inst.working_directory,
335                    inst.github_context,
336                    inst.instance_name,
337                    inst.created_at,
338                    if inst.display_hidden { 1_i64 } else { 0_i64 },
339                ],
340            )?;
341            stats.instances_as_clone_inserted += 1;
342        } else {
343            // Instance of an already-user-cloned definition — UPDATE
344            // the existing user-clone row (keyed by definition_id)
345            // to fold in the instance bindings. Collision: only the
346            // first (most recent) wins; warn on subsequent.
347            if folded.contains(&inst.definition_id) {
348                stats.instances_collision_warned += 1;
349                warn!(
350                    instance_id = %inst.id,
351                    definition_id = %inst.definition_id,
352                    "agents_consolidate: multiple instances on one user-cloned def; keeping most-recent bindings",
353                );
354                continue;
355            }
356            let name = if inst.instance_name.is_empty() {
357                inst.def_name.clone()
358            } else {
359                inst.instance_name.clone()
360            };
361            tx.execute(
362                "UPDATE db_agents SET
363                    name = ?1,
364                    identity_id = ?2,
365                    memory_id = ?3,
366                    working_directory = ?4,
367                    github_context = ?5,
368                    instance_name = ?6,
369                    user_hidden = ?7
370                 WHERE id = ?8 AND is_template = 0",
371                params![
372                    name,
373                    inst.identity_id,
374                    inst.memory_id,
375                    inst.working_directory,
376                    inst.github_context,
377                    inst.instance_name,
378                    if inst.display_hidden { 1_i64 } else { 0_i64 },
379                    inst.definition_id,
380                ],
381            )?;
382            folded.insert(inst.definition_id.clone());
383            stats.instances_folded_into_def += 1;
384        }
385    }
386
387    tx.commit()?;
388
389    // Marker written AFTER successful commit so a crash mid-backfill
390    // leaves the marker absent → next start retries from scratch.
391    if let Some(dir) = data_dir {
392        let marker = dir.join(CONSOLIDATE_MARKER);
393        if let Err(e) = std::fs::write(&marker, b"phase3a") {
394            // The data is in place; failing to write the marker just
395            // means the next startup will redo the work. Log + return
396            // success.
397            warn!(
398                error = %e,
399                marker = %marker.display(),
400                "agents_consolidate: failed to write marker; next startup will redo backfill",
401            );
402        }
403    }
404
405    info!(
406        templates_inserted = stats.templates_inserted,
407        user_defs_inserted = stats.user_defs_inserted,
408        instances_as_clone_inserted = stats.instances_as_clone_inserted,
409        instances_folded_into_def = stats.instances_folded_into_def,
410        instances_skipped_continuation = stats.instances_skipped_continuation,
411        instances_skipped_no_definition = stats.instances_skipped_no_definition,
412        instances_collision_warned = stats.instances_collision_warned,
413        "agents_consolidate: backfill completed",
414    );
415    Ok(stats)
416}
417
418/// Snapshot of one `db_agent_definitions` row, narrow projection
419/// matching what the backfill needs.
420struct DefRow {
421    id: String,
422    name: String,
423    icon: String,
424    provider: String,
425    description: String,
426    working_directory: String,
427    shell: String,
428    provider_flags: String,
429    auto_start: i64,
430    restart_on_crash: i64,
431    idle_timeout_minutes: i64,
432    created_at: i64,
433    agent_type: String,
434    environment: String,
435    agent_bus_id: String,
436    is_seeded: i64,
437    accounts: String,
438    parent_id: String,
439    branch_label: String,
440    updated_at: i64,
441    slug: String,
442}
443
444/// Snapshot of one `db_agent_instances` row plus the LEFT-JOINed
445/// definition fields the backfill copies into `db_agents`.
446struct InstanceRow {
447    id: String,
448    definition_id: String,
449    parent_instance_id: String,
450    instance_name: String,
451    identity_id: String,
452    memory_id: String,
453    working_directory: String,
454    github_context: String,
455    created_at: i64,
456    display_hidden: bool,
457    def_present: bool,
458    def_is_seeded: i64,
459    def_name: String,
460    def_icon: String,
461    def_description: String,
462    def_provider: String,
463    def_provider_flags: String,
464    def_shell: String,
465    def_environment: String,
466    def_agent_type: String,
467    def_agent_bus_id: String,
468    def_accounts: String,
469    def_auto_start: i64,
470    def_restart_on_crash: i64,
471    def_idle_timeout_minutes: i64,
472    def_slug: String,
473    def_branch_label: String,
474}
475
476#[cfg(test)]
477mod tests {
478    use super::*;
479    use crate::backend::storage::migrations::run_object_schema;
480
481    fn fresh_conn() -> Connection {
482        let conn = Connection::open_in_memory().unwrap();
483        conn.execute_batch("PRAGMA foreign_keys=ON;").unwrap();
484        run_object_schema(&conn).unwrap();
485        conn
486    }
487
488    fn insert_def(
489        conn: &Connection,
490        id: &str,
491        name: &str,
492        is_seeded: i64,
493        parent_id: &str,
494    ) {
495        conn.execute(
496            "INSERT INTO db_agent_definitions
497                (id, slug, name, icon, provider, description, working_directory, shell,
498                 provider_flags, auto_start, restart_on_crash, idle_timeout_minutes,
499                 created_at, agent_type, environment, agent_bus_id, is_seeded, accounts,
500                 parent_id, branch_label, updated_at)
501             VALUES (?1, ?2, ?3, '✦', 'claude', 'desc', '', 'bash',
502                     '', 0, 0, 0,
503                     ?4, 'standalone', '', '', ?5, '',
504                     ?6, '', ?4)",
505            params![id, id, name, 1000_i64, is_seeded, parent_id],
506        )
507        .unwrap();
508    }
509
510    fn insert_instance(
511        conn: &Connection,
512        id: &str,
513        definition_id: &str,
514        instance_name: &str,
515        identity_id: &str,
516        memory_id: &str,
517        working_directory: &str,
518        created_at: i64,
519        display_hidden: bool,
520    ) {
521        conn.execute(
522            "INSERT INTO db_agent_instances
523                (id, definition_id, parent_instance_id, block_id, session_id, status,
524                 github_context, started_at, ended_at, created_at, identity_id, memory_id,
525                 instance_name, working_directory, display_hidden)
526             VALUES (?1, ?2, '', '', '', 'running', '', ?3, 0, ?3, ?4, ?5, ?6, ?7, ?8)",
527            params![
528                id,
529                definition_id,
530                created_at,
531                identity_id,
532                memory_id,
533                instance_name,
534                working_directory,
535                if display_hidden { 1_i64 } else { 0_i64 },
536            ],
537        )
538        .unwrap();
539    }
540
541    fn count_agents(conn: &Connection, where_clause: &str) -> i64 {
542        let sql = format!("SELECT COUNT(*) FROM db_agents WHERE {where_clause}");
543        conn.query_row(&sql, [], |row| row.get(0)).unwrap()
544    }
545
546    #[test]
547    fn round_trip_template_user_clone_and_instance_lifecycle() {
548        let mut conn = fresh_conn();
549
550        // 2 templates, 1 user-cloned definition (from template tpl-1),
551        // 3 instances:
552        //   - inst-A on template tpl-1 (named "Maks")
553        //   - inst-B on template tpl-2 (unnamed)
554        //   - inst-C on user-clone-def def-u1 (named "Custom")
555        insert_def(&conn, "tpl-1", "Coder", 1, "");
556        insert_def(&conn, "tpl-2", "Reviewer", 1, "");
557        insert_def(&conn, "def-u1", "Maks-the-Coder", 0, "tpl-1");
558        insert_instance(&conn, "inst-A", "tpl-1", "Maks", "id-1", "mem-1", "/wd/a", 100, false);
559        insert_instance(&conn, "inst-B", "tpl-2", "", "", "", "/wd/b", 200, false);
560        insert_instance(&conn, "inst-C", "def-u1", "Custom", "id-2", "mem-2", "/wd/c", 300, false);
561
562        let stats = run_consolidate_migration(&mut conn, None).unwrap();
563        assert_eq!(stats.templates_inserted, 2);
564        assert_eq!(stats.user_defs_inserted, 1);
565        assert_eq!(stats.instances_as_clone_inserted, 2);
566        assert_eq!(stats.instances_folded_into_def, 1);
567        assert_eq!(stats.instances_skipped_continuation, 0);
568
569        // Templates present, is_template=1, parent_template_id empty.
570        assert_eq!(count_agents(&conn, "is_template = 1"), 2);
571        for tpl in &["tpl-1", "tpl-2"] {
572            let parent: String = conn
573                .query_row(
574                    "SELECT parent_template_id FROM db_agents WHERE id = ?1",
575                    params![tpl],
576                    |r| r.get(0),
577                )
578                .unwrap();
579            assert_eq!(parent, "");
580        }
581
582        // 3 user-clone rows: def-u1 (folded from inst-C), inst-A, inst-B.
583        assert_eq!(count_agents(&conn, "is_template = 0"), 3);
584
585        // inst-A → parent_template_id = tpl-1, name = "Maks".
586        let (parent, name, identity): (String, String, String) = conn
587            .query_row(
588                "SELECT parent_template_id, name, identity_id FROM db_agents WHERE id = 'inst-A'",
589                [],
590                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
591            )
592            .unwrap();
593        assert_eq!(parent, "tpl-1");
594        assert_eq!(name, "Maks");
595        assert_eq!(identity, "id-1");
596
597        // inst-B (unnamed) → name falls back to template name "Reviewer".
598        let name: String = conn
599            .query_row(
600                "SELECT name FROM db_agents WHERE id = 'inst-B'",
601                [],
602                |r| r.get(0),
603            )
604            .unwrap();
605        assert_eq!(name, "Reviewer");
606
607        // def-u1 (user-clone) folded inst-C's bindings.
608        let (name, identity, memory): (String, String, String) = conn
609            .query_row(
610                "SELECT name, identity_id, memory_id FROM db_agents WHERE id = 'def-u1'",
611                [],
612                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
613            )
614            .unwrap();
615        assert_eq!(name, "Custom"); // instance_name overrode def name
616        assert_eq!(identity, "id-2");
617        assert_eq!(memory, "mem-2");
618    }
619
620    #[test]
621    fn multiple_instances_on_one_user_clone_keeps_most_recent_bindings() {
622        let mut conn = fresh_conn();
623        insert_def(&conn, "tpl-1", "Coder", 1, "");
624        insert_def(&conn, "def-u1", "User Coder", 0, "tpl-1");
625        // Two instances on def-u1; most recent has id-RECENT.
626        insert_instance(&conn, "inst-old", "def-u1", "Old", "id-OLD", "mem-OLD", "/wd/old", 100, false);
627        insert_instance(&conn, "inst-new", "def-u1", "New", "id-RECENT", "mem-RECENT", "/wd/new", 999, false);
628
629        let stats = run_consolidate_migration(&mut conn, None).unwrap();
630        assert_eq!(stats.instances_collision_warned, 1);
631
632        let identity: String = conn
633            .query_row(
634                "SELECT identity_id FROM db_agents WHERE id = 'def-u1'",
635                [],
636                |r| r.get(0),
637            )
638            .unwrap();
639        assert_eq!(identity, "id-RECENT", "most-recent instance's bindings must win");
640    }
641
642    #[test]
643    fn marker_short_circuits_second_run() {
644        let mut conn = fresh_conn();
645        insert_def(&conn, "tpl-1", "Coder", 1, "");
646        let tmp = tempfile::tempdir().unwrap();
647
648        let stats1 = run_consolidate_migration(&mut conn, Some(tmp.path())).unwrap();
649        assert_eq!(stats1.templates_inserted, 1);
650        assert!(!stats1.already_done);
651        assert!(tmp.path().join(CONSOLIDATE_MARKER).exists());
652
653        // Insert a NEW row after the marker; second call must NOT see
654        // it.
655        insert_def(&conn, "tpl-2", "Reviewer", 1, "");
656        let stats2 = run_consolidate_migration(&mut conn, Some(tmp.path())).unwrap();
657        assert!(stats2.already_done);
658        assert_eq!(stats2.templates_inserted, 0);
659        // db_agents only has tpl-1 — the post-marker insert is the
660        // dual-write hook's problem, not the backfill's.
661        assert_eq!(count_agents(&conn, "is_template = 1"), 1);
662    }
663
664    #[test]
665    fn skips_continuation_rows() {
666        let mut conn = fresh_conn();
667        insert_def(&conn, "tpl-1", "Coder", 1, "");
668        insert_instance(&conn, "inst-A", "tpl-1", "Original", "", "", "/wd/a", 100, false);
669        // Continuation row (parent_instance_id = inst-A).
670        conn.execute(
671            "INSERT INTO db_agent_instances
672                (id, definition_id, parent_instance_id, block_id, session_id, status,
673                 github_context, started_at, ended_at, created_at, identity_id, memory_id,
674                 instance_name, working_directory, display_hidden)
675             VALUES ('inst-cont', 'tpl-1', 'inst-A', '', '', 'running', '', 200, 0, 200,
676                     '', '', 'Original', '/wd/a', 0)",
677            [],
678        )
679        .unwrap();
680
681        let stats = run_consolidate_migration(&mut conn, None).unwrap();
682        assert_eq!(stats.instances_skipped_continuation, 1);
683        // Only inst-A and the template made it into db_agents.
684        assert_eq!(count_agents(&conn, "1 = 1"), 2);
685        assert_eq!(count_agents(&conn, "id = 'inst-cont'"), 0);
686    }
687
688    #[test]
689    fn empty_database_is_clean_noop() {
690        let mut conn = fresh_conn();
691        let stats = run_consolidate_migration(&mut conn, None).unwrap();
692        assert_eq!(stats, ConsolidateStats::default());
693        assert_eq!(count_agents(&conn, "1 = 1"), 0);
694    }
695
696    #[test]
697    fn preserves_hidden_flag_into_user_hidden() {
698        let mut conn = fresh_conn();
699        insert_def(&conn, "tpl-1", "Coder", 1, "");
700        insert_instance(&conn, "inst-H", "tpl-1", "Hidden", "", "", "/wd/h", 100, true);
701        run_consolidate_migration(&mut conn, None).unwrap();
702        let hidden: i64 = conn
703            .query_row(
704                "SELECT user_hidden FROM db_agents WHERE id = 'inst-H'",
705                [],
706                |r| r.get(0),
707            )
708            .unwrap();
709        assert_eq!(hidden, 1);
710    }
711}